package com.ideal.hadoopadmin.crontab.changeDataBase;


import com.ideal.hadoopadmin.crontab.db.ConnectionManager;
import com.ideal.hadoopadmin.crontab.entity.Sql;
import com.ideal.hadoopadmin.crontab.hdfs.FlushHDFSInfo;
import com.ideal.hadoopadmin.crontab.hive.FlushHiveInfo;
import com.ideal.hadoopadmin.crontab.kerberos.KerberosAPI;
import com.ideal.hadoopadmin.crontab.tool.Tools;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.ScriptRunner;
import com.ideal.hadoopadmin.crontab.property.Properties;
import java.io.*;
import java.sql.*;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Created by 袁颖 on 2016/3/22.
 */
public class ChangeDataBase {
    private static List<String> tableNames;
    private static List<Map<String, Object>> clusterMachineList;
    private static Logger logger = LoggerFactory.getLogger(ChangeDataBase.class);
    private final static String CONFLICT_TABLE = "cluster_machine";//冲突表表名
    private final static String EXPORT_PATH = System.getProperty("user.dir") + System.getProperty("file.separator")+"DemoFile"+ System.getProperty("file.separator")+"exportDemoSqlNew.sql";
    private final static String IMPORT_PATH = System.getProperty("user.dir") + System.getProperty("file.separator")+"DemoFile"+System.getProperty("file.separator")+"importDate.sql";
    private final static String IMPORTFIRST_PATH = System.getProperty("user.dir") + System.getProperty("file.separator")+"DemoFile"+System.getProperty("file.separator")+"IDT.sql";
    private final static String BAKSQL = System.getProperty("user.dir") + System.getProperty("file.separator")+"DemoFile"+System.getProperty("file.separator")+"bak.sql";
    //只能在服务器上运行一次
    public void call() {
        //转换数据库
        initChangeDatabase();
    }

    //转换数据库
    private void initChangeDatabase() {
        String saveMachineSql = "select * from cluster_parameter";//检查是否是第一次初始化
        List<Map<String,Object>> resParam = ConnectionManager.queryDB(ConnectionManager.getConnection(), saveMachineSql);
        if(resParam.size() !=0 ){
            logger.error("###:Already run ,can only run once!");
            return;
        }
        //判断是否测试环境
//        if (importBakDb()) return;
        //处理旧表
        Map<String, String> odlTables = handleOldTables();
        //初始化新表
        List<String> sqlList = new ArrayList<String>();
        initNewTables(sqlList);
        //调用接口
        callInterface();
        //初始剩下的表
        initLastTables(sqlList);
        // 删除旧表
        deleteOldData();
        //统计新表对应旧表每张表数据
        getTablesChange(odlTables);
    }

    //判断是否测试环境
    private boolean importBakDb() {
        if(Tools.DemoChangeDataBase == 1){
            scriptRunFirst(BAKSQL);
        }
        return false;
    }

    //初始化新表
    private void initNewTables(List<String> sqlList) {
        scriptRun(IMPORTFIRST_PATH);
        ConnectionManager.exeSQL(Sql.CLUSTER_USER_SQL); // 初始化 cluster_user
        ConnectionManager.exeSQL(Sql.CLUSTER_MACHINE_SQL); // 初始化 cluster_machine
        scriptRun(IMPORT_PATH);
        ConnectionManager.exeSQL(Sql.CLUSTER_PARAMETER_SQL); // 初始化 cluster_parameter
        sqlList.addAll(getClusterUserKbrconfigSql());//初始化 cluster_user_kbrconfig
        sqlList.add(Sql.META_HDFS_INFO);
        sqlList.add(Sql.CLUSTER_MACHINE_TYPE);//cluster_machine_type
        sqlList.addAll(getMachineType()); //machineType
        ConnectionManager.exeSQLBatch(sqlList);
    }

    //处理旧表
    private Map<String, String> handleOldTables() {
        //导出老数据 -> 为了删除旧表
        export(EXPORT_PATH);
        //获取旧表名 -> 为了删除旧表
        tableNames = getAllTableNames();
        //统计原来的表每张表数据 map : key -> tableName, value count
        Map<String, String> odlTables = getOldTablesInfo();
        String saveMachineSql = "select * from " + CONFLICT_TABLE;//获取冲突表数据
        clusterMachineList = ConnectionManager.queryDB(ConnectionManager.getConnection(), saveMachineSql);
        return odlTables;
    }

    //初始剩下的表
    private void initLastTables(List<String> sqlList) {
        sqlList.clear();
        sqlList = getSqlList();
        sqlList.add(Sql.META_HIVE_ACCESS);
        sqlList.add(Sql.META_HDFS_ACCESS);
        ConnectionManager.exeSQLBatch(sqlList);
    }

    //统计原来的表每张表数据 map : key -> tableName, value count
    private Map<String, String> getOldTablesInfo() {
        Map<String, String> odlTables = new HashMap<String, String>();
        Connection connUserName = null;
        List<Map<String, Object>> res = null;
        for (String tab : tableNames) {
            connUserName = ConnectionManager.getConnection();
            res = ConnectionManager.queryDB(connUserName, "select count(*) num from " + tab);
            odlTables.put(tab, res.get(0).get("NUM").toString());
            res.clear(); res = null;
        }
        return odlTables;
    }

    //调用接口
    private void callInterface() {
        new FlushHDFSInfo().call();//初始化 meta_hdfs_info_bak -> 依赖 cluster_user
        new FlushHiveInfo().call();//初始化 meta_hive_info -> 依赖 cluster_user 和 meta_hdfs_info_bak
        new KerberosAPI().call();//初始化 cluster_user_kbrauth -> 依赖 cluster_user_kbrconfig 和 cluster_user 和 cluster_machine
    }

    //统计新表对应旧表每张表数据
    private void getTablesChange(Map<String, String> odlTables) {
        tableNames = getAllTableNames();//获取旧表
        //统计原来的表每张表数据 map : key -> tableName, value count
        Map<String, String> newTables = new HashMap<String, String>();
        Connection connUserName = null;
        List<Map<String, Object>> res = null;
        for (String tab : tableNames) {
            connUserName = ConnectionManager.getConnection();
            res = ConnectionManager.queryDB(connUserName, "select count(*) num from " + tab);
            newTables.put(tab, res.get(0).get("NUM").toString());
            res.clear(); res = null;
        }
        Iterator<Map.Entry<String, String>> iteratorNew = newTables.entrySet().iterator();
        while (iteratorNew.hasNext()) {
            Map.Entry<String, String> entry = iteratorNew.next();
            logger.info("新 " + entry.getKey() + " 表有 "+entry.getValue()+" 条数据！对应旧表 "+ TableEnum.getTab(entry.getKey())+" "+odlTables.get(TableEnum.getTab(entry.getKey()))+" 条！");
        }
    }

    public static void main(String[] args) throws IOException, SQLException {
        new ChangeDataBase().initChangeDatabase();
    }

    //从数据库导出数据并以.sql文件存储
    public void export(String exportPath) {
        try {
            Properties properties = Properties.instance(false);
            Runtime runtime = Runtime.getRuntime();
            String command = getExportCommand(properties, exportPath);
            runtime.exec(command);//这里简单一点异常我就直接往上抛
            logger.info("导出数据库成功,文件路径为:" + exportPath + "\n执行cmd命令:" + command);
        } catch (IOException e) {
            e.printStackTrace();
            logger.info("###ChangeDataBase: Export database error!");
        }

    }

    //获取导出文件cmd命令
    private static String getExportCommand(Properties properties, String exportPath) {
        StringBuffer command = new StringBuffer();
        String username = properties.getPropertyByKey(Properties.WEB_APP_DB_USER, "root");//用户名
        String password = properties.getPropertyByKey(Properties.WEB_APP_DB_PW, "1234");//用户密码
        String exportDatabaseName = properties.getPropertyByKey(Properties.WEB_APP_DB_DB, "hadoopadminweb_db");//需要导出的数据库名
        String host = properties.getPropertyByKey(Properties.WEB_APP_DB_IP, "localhost");//从哪个主机导出数据库，如果没有指定这个值，则默认取localhost
        String port = "3306";//使用的端口号,直接指定

        //注意哪些地方要空格，哪些不要空格
        command.append("mysqldump -u").append(username).append(" -p").append(password)//密码是用的小p，而端口是用的大P。
                .append(" -h").append(host).append(" -P").append(port).append(" ").append(exportDatabaseName).append(" -r ").append(exportPath);
        return command.toString();
    }

    //获取所有的老表名
    public List<String> getAllTableNames() {
        List<String> tableNames = new ArrayList<String>();
        try {
            Connection conn = ConnectionManager.getConnection();
            Properties properties = Properties.instance(false);
            DatabaseMetaData dbmd = conn.getMetaData();
            // 表名列表
            ResultSet rest = dbmd.getTables(conn.getCatalog(), properties.getPropertyByKey(Properties.WEB_APP_DB_USER, "localhost"), null, new String[]{"TABLE"});
            // 输出 table_name
            while (rest.next()) {
                String tableName = rest.getString("TABLE_NAME");
                tableNames.add(tableName);
            }
            ConnectionManager.closeConn(conn);
            dbmd = null;rest = null;
        } catch (SQLException e) {
            e.printStackTrace();
            logger.info("###ChangeDataBase: get database tables error!");
        }
        return tableNames;
    }

    //执行shql脚本
    public static void scriptRun(String importPath) {
        if (cheackFilePath(importPath)) return;
        Connection connection = ConnectionManager.getConnection();
        ScriptRunner scriptRunner = new ScriptRunner(connection);
        Reader readerSystemData = null;
        try {
            readerSystemData = new BufferedReader(new FileReader(importPath));
        } catch (FileNotFoundException e) {
            logger.info("###ChangeDataBase: scriptRun read importPath error, please check importPath!");
            return;
        }
        scriptRunner.runScript(readerSystemData);
        ConnectionManager.closeConn(connection);
    }

    //测试环境用
    public static void scriptRunFirst(String importPath) {
        if (cheackFilePath(importPath)) return;
        Connection connection = ConnectionManager.getConnection();
        try {
            BufferedReader bufferedReader = new BufferedReader(new FileReader(importPath));
            StringBuilder sBuilder = new StringBuilder("");
            String str = bufferedReader.readLine();
            while (str != null) {
                if (!str.startsWith("#") && !str.startsWith("/*") && !str.startsWith("-") && !str.startsWith("\n"))
                    sBuilder.append(str);
                str = bufferedReader.readLine();
            }
            String[] strArr = sBuilder.toString().split("///");
            List<String> strList = new ArrayList<String>();
            for (String s : strArr) {
                strList.add(s);
            }
            ConnectionManager.exeSQLBatch(strList);
            strList.clear(); strList = null; sBuilder.setLength(0); sBuilder = null; bufferedReader = null;
            strArr = null;
        } catch (IOException e) {
            logger.info("###ChangeDataBase: init bakdb error, please check !");
        } finally {
            ConnectionManager.closeConn(connection);
        }
    }

    private static boolean cheackFilePath(String importPath) {
        File file = new File(importPath);
        if (!file.exists()) {
            file.delete();
            logger.info("###ChangeDataBase: importPath error, please check importPath!");
            return true;
        }
        return false;
    }

    //获取导入数据sql
    public List<String> getSqlList() {
        List<String> sqlList = new ArrayList<String>();
        sqlList.addAll(Sql.getSqlList());
//        sqlList.addAll(getMachineType());
        return sqlList;
    }

    //初始化 cluster_user_kbrconfig
    public List<String> getClusterUserKbrconfigSql() {
        String sql = "select * from kdc_config";
        //从kdc_config表中获取所有的数据
        List<Map<String, Object>> mapList = ConnectionManager.queryDB(ConnectionManager.getConnection(), sql);
        if(mapList.size() == 0){
            logger.info("###ChangeDataBase: kdc_config table is null error!");
            return new ArrayList<String>();
        }
        int count = 1;
        List<String> sqlList = new ArrayList<String>();
        StringBuffer kbrconfigSql = new StringBuffer("insert into cluster_user_kbrconfig(id, userId, validDay, machineIds, status, changeTime, createTime, startTime, endTime) VALUES");
        try {
            int id = 0, validDay = 0, status = 0;
            String ips = "", machineIds = "";
            Long userId = 0l, changeTime = 0l, createTime = 0l, startTime = 0l, endTime = 0l;
            Timestamp start = null, end = null;
            for (Map<String, Object> map : mapList) {
                //处理数据,得到想要的数据并放置到list里面
                id = Integer.parseInt(map.get("ID").toString());
                userId = getNewUserIdByOld((String) map.get("HP_USER_ID".toUpperCase()));
                validDay = map.get("VALID_DAY") == null ? 0 : Integer.parseInt(map.get("VALID_DAY").toString());
                ips = null == map.get("MC_IP") ? "" : map.get("MC_IP").toString();
                machineIds = StringUtils.isBlank(ips) ? "" : getMachineIdsByMachineIps(ips);
                status = (Integer.parseInt(map.get("STATUS").toString())) == 1 ? 1 : 0;
                changeTime = System.currentTimeMillis();
                createTime = map.get("CREATE_TIME") == null ? System.currentTimeMillis() : ((Timestamp) (map.get("create_time".toUpperCase()))).getTime();
                start = (Timestamp) map.get("START_TIME");
                startTime = map.get("START_TIME") == null ? System.currentTimeMillis() : ((Timestamp) ((map.get("start_time".toUpperCase())))).getTime();
                end = (Timestamp) map.get("END_TIME");
                endTime = end == null ? System.currentTimeMillis() : end.getTime();
                //组织sql
                if (count == mapList.size()) {
                    kbrconfigSql.append("(" + id + "," + userId + "," + validDay + ",'" + machineIds + "'," + status + "," + changeTime + "," + createTime + "," + startTime + "," + endTime + ")");
                } else {
                    kbrconfigSql.append("(" + id + "," + userId + "," + validDay + ",'" + machineIds + "'," + status + "," + changeTime + "," + createTime + "," + startTime + "," + endTime + "),");
                }
                count++;
            }
        } catch (SQLException e) {
            logger.info("###ChangeDataBase: getClusterUserKbrconfigSql method error!");
        }
        sqlList.add(kbrconfigSql.toString());
        return sqlList;
    }

    //根据userName查找相应的hp_user_id对应的userId
    public static Long getNewUserIdByOld(String oldUserId) throws SQLException {
        Long userId = null;
        try {
            Connection connection = ConnectionManager.getConnection();
            Statement statement = connection.createStatement();
            String sql = "select cu.id\n" +
                    "from hadoopuser h\n" +
                    "join cluster_user cu\n" +
                    "on h.hp_user_name=cu.userName\n" +
                    "where h.hp_user_id='" + oldUserId + "'";
            ResultSet resultSet = statement.executeQuery(sql);
            if (resultSet.next()) {
                userId = resultSet.getLong("cu.id");
            }
            ConnectionManager.closeConn(connection);
        } catch (SQLException e) {
            e.printStackTrace();
            logger.info("cluster_user_kbrconfig表获取userId失败!");
        }
        return userId;
    }

    //将ip转化为id
    public static String getMachineIdsByMachineIps(String machineIps) throws SQLException {
        int count = 1;
        StringBuffer machineIds = new StringBuffer();
        Connection conn = null;
        try {
            conn = ConnectionManager.getConnection();
            Statement statement = conn.createStatement();
            if (!StringUtils.isBlank(machineIps)) {
                String[] machineIp = machineIps.split(",");
                String sql = "", machineId = "";
                ResultSet resultSet = null;
                for (int i = 0; i < machineIp.length; i++) {
                    //通过查找cluster_user_machine表的ip获取id
                    sql = "select id from machine_info where mc_type=3 and mc_ip='" + machineIp[i] + "'";
                    resultSet = statement.executeQuery(sql);
                    machineId = "";
                    if (resultSet.next()) {
                        machineId = resultSet.getString("ID");
                    }
                    machineIds.append(count == machineIp.length ? StringUtils.isBlank(machineId) ? "" : machineId : StringUtils.isBlank(machineId) ? "" : (machineId + ","));
                    count++;
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
            logger.info("cluster_user_kbrconfig表获取machineIds失败!");
        }finally{
            ConnectionManager.closeConn(conn);
        }
        return machineIds.toString();
    }

    //根据缓存的cluster_machine表数据,导入到新表cluster_cluster_type里面
    public static List<String> getMachineType() {
        List<String> clusterMachineSql = new ArrayList<String>();
        if (clusterMachineList.size() == 0) return clusterMachineSql;
        int count = 1, id = 0, clusterTypeId = 0;
        StringBuffer sql = new StringBuffer("INSERT INTO `cluster_cluster_type` (`id`, `clusterTypeId`, `clusterTypeName`, `note`) VALUES");
        String clusterTypeName = "";
        for (Map<String, Object> map : clusterMachineList) {
            id = (Integer) map.get("ID");
            clusterTypeId = Integer.parseInt(map.get("MACHINE_CLUSTER_TYPE").toString());
            clusterTypeName = map.get("CLUSTER_NAME").toString();
            if (count == clusterMachineList.size()) {
                sql.append(" (" + id + "," + clusterTypeId + ", '" + clusterTypeName + "','" + map.get("NOTE") + "')");
            } else {
                sql.append(" (" + id + "," + clusterTypeId + ", '" + clusterTypeName + "','" + map.get("NOTE") + "'),");
            }
            count++;
        }
        clusterMachineSql.add(sql.toString());
        return clusterMachineSql;
    }

    //删除库中所有的老表(根据表名删除表).cluster_machine表已被 替换,不能删除
    public void deleteOldData() {
        Connection connection = null;
        try {
            connection = ConnectionManager.getConnection();
            Statement statement = connection.createStatement();
            String deleteSql = "";
            for (String tableName : tableNames) {
                deleteSql = "drop table " + tableName;
                if (!CONFLICT_TABLE.equals(tableName)) {
                    statement.execute(deleteSql);
                }
            }
            statement = null;
            logger.info("###ChangeDataBase: delete OldData success!");
        } catch (SQLException e) {
            logger.info("###ChangeDataBase: delete OldData error!");
        }finally {
            ConnectionManager.closeConn(connection);
        }
    }

}
